IGNITE-1678 first implementation (fully not working)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/839f43ad Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/839f43ad Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/839f43ad Branch: refs/heads/ignite-1678 Commit: 839f43adf80a04e4a2778983560031592a20a89e Parents: 4a1d0c2 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Sun Aug 26 14:45:49 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Sun Aug 26 14:45:49 2018 +0300 ---------------------------------------------------------------------- .../configuration/AtomicConfiguration.java | 2 +- .../GridCacheAtomicSequenceImpl.java | 270 ++++++++++--------- 2 files changed, 147 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/839f43ad/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java index 8d0e0be..5dfd1e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java @@ -38,7 +38,7 @@ public class AtomicConfiguration { public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000; /** Default atomic sequence reservation size. */ - public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 70; + public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 80; /** Default batch size for all cache's sequences. */ private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE; http://git-wip-us.apache.org/repos/asf/ignite/blob/839f43ad/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 61e9262..c69d483 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -27,21 +27,20 @@ import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -50,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * Cache sequence implementation. */ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<GridCacheAtomicSequenceValue> - implements GridCacheAtomicSequenceEx, IgniteChangeGlobalStateSupport, Externalizable { + implements GridCacheAtomicSequenceEx, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -73,22 +72,25 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< private volatile int batchSize; /** Reservation percentage. */ - private int reservePercentage; + private volatile int reservePercentage; - /** Synchronization lock for local value updates. */ - private final Lock localUpdate = new ReentrantLock(); + /** Reserved bottom bound of local counter (included). */ + private volatile long reservedBottomBound; + + /** Reserved upper bound of local counter (not included). */ + private volatile long reservedUpBound; - /** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */ - private final ReentrantLock distUpdateFreeTop = new ReentrantLock(); + /** A limit after which a new reservation should be done. */ + private volatile long newReservationLine; - /** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */ - private final ReentrantLock distUpdateLockedTop = new ReentrantLock(); + /** Reservation future. */ + private volatile IgniteInternalFuture<?> reservationFut; - /** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */ - private final Callable<Long> incAndGetCall = internalUpdate(1, true); + /** Reservation pool. */ + private final byte poolPlc = GridIoPolicy.SYSTEM_POOL; - /** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */ - private final Callable<Long> getAndIncCall = internalUpdate(1, false); + /** Synchronization lock for local value updates. */ + private final Lock localUpdateLock = new ReentrantLock(); /** * Empty constructor required by {@link Externalizable}. @@ -125,6 +127,11 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< this.reservePercentage = reservePercentage; this.upBound = upBound; this.locVal = locVal; + + reservedBottomBound = locVal; + reservedUpBound = upBound; + // Calculate next reservation bound. + newReservationLine = calculateNewReservationLine(locVal); } /** {@inheritDoc} */ @@ -137,7 +144,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< /** {@inheritDoc} */ @Override public long incrementAndGet() { try { - return internalUpdate(1, incAndGetCall, true); + return internalUpdate(1, true); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -147,7 +154,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< /** {@inheritDoc} */ @Override public long getAndIncrement() { try { - return internalUpdate(1, getAndIncCall, false); + return internalUpdate(1, false); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -159,7 +166,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); try { - return internalUpdate(l, null, true); + return internalUpdate(l, true); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -171,7 +178,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); try { - return internalUpdate(l, null, false); + return internalUpdate(l, false); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -182,99 +189,121 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< * Synchronous sequence update operation. Will add given amount to the sequence value. * * @param l Increment amount. - * @param updateCall Cache call that will update sequence reservation count in accordance with l. * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value * prior to update. * @return Sequence value. * @throws IgniteCheckedException If update failed. */ @SuppressWarnings("SignalWithoutCorrespondingAwait") - private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException { - checkRemoved(); - + private long internalUpdate(long l, boolean updated) throws IgniteCheckedException { assert l > 0; - localUpdate.lock(); + while (true){ + checkRemoved(); - try { - // If reserved range isn't exhausted. - long locVal0 = locVal; + localUpdateLock.lock(); - if (locVal0 + l <= upBound) { - locVal = locVal0 + l; + IgniteInternalFuture<?> reservation = reservationFut; - return updated ? locVal0 + l : locVal0; - } - } - finally { - localUpdate.unlock(); - } + try { + boolean reservationInProgress = reservation != null; - AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null); + long newLocalVal = locVal + l; - // We need two separate locks here because two independent thread may attempt to update the sequence - // simultaneously, one thread with locked topology and other with unlocked. - // We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread - // waits for topology change, and locked topology thread waits to acquire the lock. - // If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time - // we do not want multiple threads to attempt to run identical cache updates. - ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop; + // Reserve new interval if operation is not in progress. + if (newLocalVal >= newReservationLine && newLocalVal <= reservedUpBound && !reservationInProgress){ + reservationFut = runAsyncReservation(0); + } - distLock.lock(); + long locVal0 = locVal; - try { - if (updateCall == null) - updateCall = internalUpdate(l, updated); + if (newLocalVal <= upBound) { + locVal = newLocalVal; - try { - return CU.retryTopologySafe(updateCall); - } - catch (IgniteCheckedException | IgniteException | IllegalStateException e) { - throw e; + return updated ? newLocalVal : locVal0; + } + + // Await complete previous reservation. + if (reservationInProgress){ + reservation.get(); + + reservationFut = null; + + // Retry check bounds. + continue; + } + + // Still in reserved interval. + if (newLocalVal < reservedUpBound) { + long curVal = locVal; + + if (newLocalVal < reservedBottomBound) + locVal = reservedBottomBound; + else + locVal += l; + + upBound = reservedUpBound; + + return updated ? locVal : curVal; + } + // Switched to the next interval. New value more that upper reserved bound. + else if (!reservationInProgress) { + long diff = newLocalVal - reservedUpBound; + + // Calculate how many batch size included in l. + // It will our offset for global seq counter. + long off = (diff / batchSize) * batchSize; + + reservationFut = runAsyncReservation(off); + + // Can not wait async, should wait under lock until new interval reserved. + reservationFut.get(); + + reservationFut = null; + } } - catch (Exception e) { - throw new IgniteCheckedException(e); + finally { + localUpdateLock.unlock(); } } - finally { - distLock.unlock(); - } } - /** Get local batch size for this sequences. - * - * @return Sequence batch size. - */ + /** {@inheritDoc} */ @Override public int batchSize() { return batchSize; } - /** - * Set local batch size for this sequences. - * - * @param size Sequence batch size. Must be more then 0. - */ + /** {@inheritDoc} */ @Override public void batchSize(int size) { A.ensure(size > 0, " Batch size can't be less then 0: " + size); - localUpdate.lock(); + localUpdateLock.lock(); try { batchSize = size; } finally { - localUpdate.unlock(); + localUpdateLock.unlock(); } } /** {@inheritDoc} */ @Override public int reservePercentage() { - return 0; + return reservePercentage; } /** {@inheritDoc} */ @Override public void reservePercentage(int percentage) { + A.ensure(percentage >= 0 && percentage <= 100, "Invalid reserve percentage: " + percentage); + localUpdateLock.lock(); + + try { + reservePercentage = percentage; + } + finally { + localUpdateLock.unlock(); + } } /** {@inheritDoc} */ @@ -297,80 +326,64 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< } /** - * Method returns callable for execution all update operations in async and sync mode. + * Runs async reservation of new range for current node. * - * @param l Value will be added to sequence. - * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value. - * @return Callable for execution in async and sync mode. + * @param off Offset. + * @return Future. */ - @SuppressWarnings("TooBroadScope") - private Callable<Long> internalUpdate(final long l, final boolean updated) { - return new Callable<Long>() { - @Override public Long call() throws Exception { - assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread(); - - try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicSequenceValue seq = cacheView.get(key); - - checkRemoved(); - - assert seq != null; + private IgniteInternalFuture<?> runAsyncReservation(final long off) { + assert off >= 0 : off; - long curLocVal; + GridFutureAdapter<?> resFut = new GridFutureAdapter<>(); - long newUpBound; + resFut.listen(f -> { + if (f.error() == null) + reservationFut = null; + }); - // Even though we hold a transaction lock here, we must hold the local update lock here as well - // because we mutate multipe variables (locVal and upBound). - localUpdate.lock(); + ctx.kernalContext().closure().runLocalSafe(() -> { + Callable<Void> reserveCall = reserveCallable(off); - try { - curLocVal = locVal; - - // If local range was already reserved in another thread. - if (curLocVal + l <= upBound) { - locVal = curLocVal + l; + try { + CU.retryTopologySafe(reserveCall); - return updated ? curLocVal + l : curLocVal; - } + resFut.onDone(); + } + catch (Throwable h) { + resFut.onDone(h); + } + }, poolPlc); - long curGlobalVal = seq.get(); + return resFut; + } - long newLocVal; + /** + * @param off Reservation offset. + * @return Callable for reserved new interval. + */ + private Callable<Void> reserveCallable(long off){ + return new Callable<Void>() { + @Override public Void call() throws Exception { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicSequenceValue seq = cacheView.get(key); - /* We should use offset because we already reserved left side of range.*/ - long off = batchSize > 1 ? batchSize - 1 : 1; + checkRemoved(); - // Calculate new values for local counter, global counter and upper bound. - if (curLocVal + l >= curGlobalVal) { - newLocVal = curLocVal + l; + assert seq != null; - newUpBound = newLocVal + off; - } - else { - newLocVal = curGlobalVal; + long curGlobalVal = seq.get(); - newUpBound = newLocVal + off; - } + reservedBottomBound = curGlobalVal + off; - locVal = newLocVal; - upBound = newUpBound; + reservedUpBound = reservedBottomBound + batchSize; - if (updated) - curLocVal = newLocVal; - } - finally { - localUpdate.unlock(); - } + newReservationLine = calculateNewReservationLine(reservedBottomBound); - // Global counter must be more than reserved upper bound. - seq.set(newUpBound + 1); + seq.set(reservedUpBound); cacheView.put(key, seq); tx.commit(); - - return curLocVal; } catch (Error | Exception e) { if(!X.hasCause(e, ClusterTopologyCheckedException.class)) @@ -378,10 +391,19 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< throw e; } + + return null; } }; } + /** + * @return New reservation line. + */ + private long calculateNewReservationLine(long initialValue) { + return initialValue + (batchSize * reservePercentage / 100); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext());