IGNITE-5613 - Fixed race on local sequence increment and distributed update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c08849cd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c08849cd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c08849cd Branch: refs/heads/ignite-2.1.2-exchange Commit: c08849cdb362f1c699afb5d04383fa3200193539 Parents: 3cc13eae Author: Alexey Goncharuk <[email protected]> Authored: Mon Jul 3 17:05:48 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Jul 3 17:06:38 2017 +0300 ---------------------------------------------------------------------- .../GridCacheAtomicSequenceImpl.java | 55 ++++++++++++-------- ...titionedAtomicSequenceMultiThreadedTest.java | 32 ++++++++++++ 2 files changed, 64 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c08849cd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 31ec16f..0354a17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -385,39 +385,48 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc long newUpBound; - curLocVal = locVal; + // Even though we hold a transaction lock here, we must hold the local update lock here as well + // because we mutate multipe variables (locVal and upBound). + localUpdate.lock(); - // If local range was already reserved in another thread. - if (curLocVal + l <= upBound) { - locVal = curLocVal + l; + try { + curLocVal = locVal; - return updated ? curLocVal + l : curLocVal; - } + // If local range was already reserved in another thread. + if (curLocVal + l <= upBound) { + locVal = curLocVal + l; - long curGlobalVal = seq.get(); + return updated ? curLocVal + l : curLocVal; + } - long newLocVal; + long curGlobalVal = seq.get(); - /* We should use offset because we already reserved left side of range.*/ - long off = batchSize > 1 ? batchSize - 1 : 1; + long newLocVal; - // Calculate new values for local counter, global counter and upper bound. - if (curLocVal + l >= curGlobalVal) { - newLocVal = curLocVal + l; + /* We should use offset because we already reserved left side of range.*/ + long off = batchSize > 1 ? batchSize - 1 : 1; - newUpBound = newLocVal + off; - } - else { - newLocVal = curGlobalVal; + // Calculate new values for local counter, global counter and upper bound. + if (curLocVal + l >= curGlobalVal) { + newLocVal = curLocVal + l; - newUpBound = newLocVal + off; - } + newUpBound = newLocVal + off; + } + else { + newLocVal = curGlobalVal; - locVal = newLocVal; - upBound = newUpBound; + newUpBound = newLocVal + off; + } - if (updated) - curLocVal = newLocVal; + locVal = newLocVal; + upBound = newUpBound; + + if (updated) + curLocVal = newLocVal; + } + finally { + localUpdate.unlock(); + } // Global counter must be more than reserved upper bound. seq.set(newUpBound + 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/c08849cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java index 945650d..4db9bd3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned; import java.util.Random; import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMode; @@ -26,6 +27,7 @@ import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest; import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -281,6 +283,36 @@ public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteA } /** + * @throws Exception if failed. + */ + public void testMultipleSequences() throws Exception { + final int seqCnt = 5; + final int threadCnt = 5; + final int incCnt = 1_000; + + final IgniteAtomicSequence[] seqs = new IgniteAtomicSequence[seqCnt]; + + String seqName = UUID.randomUUID().toString(); + + for (int i = 0; i < seqs.length; i++) + seqs[i] = grid(0).atomicSequence(seqName, 0, true); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < incCnt; i++) { + for (IgniteAtomicSequence seq : seqs) + seq.incrementAndGet(); + } + + return null; + } + }, threadCnt, "load"); + + for (IgniteAtomicSequence seq : seqs) + assertEquals(seqCnt * threadCnt * incCnt, seq.get()); + } + + /** * Executes given closure in a given number of threads given number of times. * * @param c Closure to execute.
