ignite-1977 - fixed IgniteSemaphore fault tolerance.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/902bf42c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/902bf42c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/902bf42c Branch: refs/heads/ignite-1192 Commit: 902bf42c36f46b0aaa605b779a699eb8e0c0aca3 Parents: aeacad6 Author: Vladisav Jelisavcic <vladis...@gmail.com> Authored: Tue Apr 11 14:09:12 2017 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Tue Apr 11 14:09:12 2017 +0300 ---------------------------------------------------------------------- .../datastructures/GridCacheSemaphoreImpl.java | 74 +++++++++++++++++--- .../datastructures/GridCacheSemaphoreState.java | 22 ++++++ ...eAbstractDataStructuresFailoverSelfTest.java | 21 ++++-- 3 files changed, 102 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index a1c0515..159e735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -232,6 +232,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter /** {@inheritDoc} */ @Override protected final boolean tryReleaseShared(int releases) { + // Fail-fast path. + if(broken) + return true; + // Check if some other node updated the state. // This method is called with release==0 only when trying to wake through update. if (releases == 0) @@ -295,6 +299,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter throw new IgniteCheckedException("Failed to find semaphore with given name: " + name); + // Abort if state is already broken. + if (val.isBroken()) { + tx.rollback(); + + return true; + } + boolean retVal = val.getCount() == expVal; if (retVal) { @@ -349,11 +360,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter /** * This method is used for releasing the permits acquired by failing node. + * In case the semaphore is broken, no permits are released and semaphore is set (globally) to broken state. * * @param nodeId ID of the failing node. + * @param broken Flag indicating that this semaphore is broken. * @return True if this is the call that succeeded to change the global state. */ - protected boolean releaseFailedNode(final UUID nodeId) { + protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) { try { return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @@ -369,6 +382,25 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter throw new IgniteCheckedException("Failed to find semaphore with given name: " + name); + // Quit early if semaphore is already broken. + if( val.isBroken()) { + tx.rollback(); + + return false; + } + + // Mark semaphore as broken. No permits are released, + // since semaphore is useless from now on. + if (broken) { + val.setBroken(true); + + semView.put(key, val); + + tx.commit(); + + return true; + } + Map<UUID, Integer> map = val.getWaiters(); if (!map.containsKey(nodeId)) { @@ -473,7 +505,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter tx.commit(); - return new Sync(cnt, waiters, failoverSafe); + Sync sync = new Sync(cnt, waiters, failoverSafe); + + sync.setBroken(val.isBroken()); + + return sync; } } }), @@ -520,6 +556,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter if (sync == null) return; + // Update broken flag. + sync.setBroken(val.isBroken()); + // Update permission count. sync.setPermits(val.getCount()); @@ -535,10 +574,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter int numPermits = sync.getPermitsForNode(nodeId); if (numPermits > 0) { - if (sync.failoverSafe) - // Release permits acquired by threads on failing node. - sync.releaseFailedNode(nodeId); - else { + // Semaphore is broken if reaches this point in non-failover safe mode. + boolean broken = !sync.failoverSafe; + + // Release permits acquired by threads on failing node. + sync.releaseFailedNode(nodeId, broken); + + if (broken) { // Interrupt every waiting thread if this semaphore is not failover safe. sync.setBroken(true); @@ -614,8 +656,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter sync.acquireSharedInterruptibly(permits); - if (isBroken()) + if (isBroken()) { + Thread.interrupted(); // Clear interrupt flag. + throw new InterruptedException(); + } } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -731,8 +776,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter boolean result = sync.nonfairTryAcquireShared(1) >= 0; - if (isBroken()) + if (isBroken()) { + Thread.interrupted(); // Clear interrupt flag. + throw new InterruptedException(); + } return result; } @@ -756,8 +804,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); - if (isBroken()) + if (isBroken()) { + Thread.interrupted(); // Clear interrupt flag. + throw new InterruptedException(); + } return result; } @@ -825,8 +876,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); - if (isBroken()) + if (isBroken()) { + Thread.interrupted(); + throw new InterruptedException(); + } return result; } http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java index 50cdf10..cdff9c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java @@ -46,6 +46,9 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl /** FailoverSafe flag. */ private boolean failoverSafe; + /** Flag indicating that semaphore is no longer safe to use. */ + private boolean broken; + /** * Constructor. * @@ -101,6 +104,21 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl return failoverSafe; } + /** + * @return broken flag. + */ + public boolean isBroken() { + return broken; + } + + /** + * + * @param broken Flag indicating that this semaphore should be no longer used. + */ + public void setBroken(boolean broken) { + this.broken = broken; + } + /** {@inheritDoc} */ @Override public Object clone() throws CloneNotSupportedException { return super.clone(); @@ -120,6 +138,8 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl out.writeInt(e.getValue()); } } + + out.writeBoolean(broken); } /** {@inheritDoc} */ @@ -135,6 +155,8 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl for (int i = 0; i < size; i++) waiters.put(U.readUuid(in), in.readInt()); } + + broken = in.readBoolean(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 285ea6e..f918acd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -530,8 +530,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ private void doTestSemaphore(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe) throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1977"); - final int permits = topWorker instanceof MultipleTopologyChangeWorker || topWorker instanceof PartitionedMultipleTopologyChangeWorker ? TOP_CHANGE_THREAD_CNT * 3 : TOP_CHANGE_CNT; @@ -548,9 +546,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig break; } catch (IgniteInterruptedException e) { - // Exception may happen in non failover safe mode. + // Exception may happen in non failover safe mode. if (failoverSafe) throw e; + else { + // In non-failoverSafe mode semaphore is not safe to be reused, + // and should always be discarded after exception is caught. + break; + } } } @@ -569,6 +572,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig // Exception may happen in non failover safe mode. if (failoverSafe) throw e; + else { + // In non-failoverSafe mode semaphore is not safe to be reused, + // and should always be discarded after exception is caught. + break; + } } } @@ -581,8 +589,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); - for (Ignite g : G.allGrids()) - assertEquals(permits, g.semaphore(STRUCTURE_NAME, permits, false, false).availablePermits()); + // Semaphore is left in proper state only if failoverSafe mode is used. + if (failoverSafe) { + for (Ignite g : G.allGrids()) + assertEquals(permits, g.semaphore(STRUCTURE_NAME, permits, false, false).availablePermits()); + } } }