Repository: ignite Updated Branches: refs/heads/ignite-comm-balance-master 6b3b60d21 -> 65803acad
reverted future Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65803aca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65803aca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65803aca Branch: refs/heads/ignite-comm-balance-master Commit: 65803acada22b85c0c5d9ea759de178e1a825cca Parents: 6b3b60d Author: Yakov Zhdanov <[email protected]> Authored: Mon Feb 6 17:24:08 2017 +0700 Committer: Yakov Zhdanov <[email protected]> Committed: Mon Feb 6 17:24:08 2017 +0700 ---------------------------------------------------------------------- .../internal/util/future/GridFutureAdapter.java | 444 +++++++------------ 1 file changed, 170 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/65803aca/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index bd11301..723dff7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -17,16 +17,18 @@ package org.apache.ignite.internal.util.future; +import java.util.Arrays; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -36,58 +38,32 @@ import org.jetbrains.annotations.Nullable; /** * Future adapter. - * TODO: - * 1. remove serializable - * 2. remove start time, end time. - * 3. remove ignore interrupts flag. */ -public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { - /* - * https://bugs.openjdk.java.net/browse/JDK-8074773 - */ - static { - Class<?> ensureLoaded = LockSupport.class; - } - - /** - * Future state. - */ - private enum State { - INIT, - - CANCELLED, - } - - /** - * - */ - static final class WaitNode { - /** */ - private final Object waiter; +public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> { + /** */ + private static final long serialVersionUID = 0L; - /** */ - private volatile Object next; + /** Initial state. */ + private static final int INIT = 0; - /** - * @param waiter Waiter. - */ - WaitNode(Object waiter) { - this.waiter = waiter; - } - } + /** Cancelled state. */ + private static final int CANCELLED = 1; - /** State updater. */ - private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd = - AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state"); + /** Done state. */ + private static final int DONE = 2; /** */ - private static final long serialVersionUID = 0L; + private static final byte ERR = 1; /** */ - private boolean ignoreInterrupts; + private static final byte RES = 2; /** */ - private volatile Object state = State.INIT; + private byte resFlag; + + /** Result. */ + @GridToStringInclude(sensitive = true) + private Object res; /** Future start time. */ private final long startTime = U.currentTimeMillis(); @@ -95,6 +71,13 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { /** Future end time. */ private volatile long endTime; + /** */ + private boolean ignoreInterrupts; + + /** */ + @GridToStringExclude + private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr; + /** {@inheritDoc} */ @Override public long startTime() { return startTime; @@ -123,16 +106,12 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { /** {@inheritDoc} */ @Override public Throwable error() { - Object state0 = state; - - return (state0 instanceof Throwable) ? (Throwable)state0 : null; + return (resFlag == ERR) ? (Throwable)res : null; } /** {@inheritDoc} */ @Override public R result() { - Object state0 = state; - - return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null; + return resFlag == RES ? (R)res : null; } /** {@inheritDoc} */ @@ -174,36 +153,28 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { * @throws IgniteCheckedException If failed. */ private R get0(boolean ignoreInterrupts) throws IgniteCheckedException { - Object res = registerWaiter(Thread.currentThread()); - - if (res != State.INIT) { - // no registration was done since a value is available. - return resolve(res); - } - - boolean interrupted = false; - try { - for (; ; ) { - LockSupport.park(); + if (endTime == 0) { + if (ignoreInterrupts) + acquireShared(0); + else + acquireSharedInterruptibly(0); + } - if (isDone()) - return resolve(state); + if (getState() == CANCELLED) + throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - else if (Thread.interrupted()) { - interrupted = true; + assert resFlag != 0; - if (!ignoreInterrupts) { - unregisterWaiter(Thread.currentThread()); + if (resFlag == ERR) + throw U.cast((Throwable)res); - throw new IgniteInterruptedCheckedException("Thread has been interrupted."); - } - } - } + return (R)res; } - finally { - if (interrupted) - Thread.currentThread().interrupt(); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); } } @@ -215,183 +186,46 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { * @throws IgniteCheckedException If error occurred. */ @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException { - Object res = registerWaiter(Thread.currentThread()); - - if (res != State.INIT) - return resolve(res); - - long deadlineNanos = System.nanoTime() + nanosTimeout; - - boolean interrupted = false; - - try { - long nanosTimeout0 = nanosTimeout; + if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout)) + throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed."); - while (nanosTimeout0 > 0) { - LockSupport.parkNanos(nanosTimeout0); - - nanosTimeout0 = deadlineNanos - System.nanoTime(); - - if (isDone()) - return resolve(state); - - else if (Thread.interrupted()) { - interrupted = true; - - if (!ignoreInterrupts) - throw new IgniteInterruptedCheckedException("Thread has been interrupted."); - } - } - } - finally { - if (interrupted) - Thread.currentThread().interrupt(); - - unregisterWaiter(Thread.currentThread()); - } - - throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed."); - } - - /** - * Resolves the value to result or exception. - * - * @param val Value to resolve. - * @return Result. - * @throws IgniteCheckedException If resolved to exception. - */ - private R resolve(Object val) throws IgniteCheckedException { - if (val == State.CANCELLED) + if (getState() == CANCELLED) throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - if (val instanceof Throwable) - throw U.cast((Throwable)val); - - return (R)val; - } - - /** - * @param waiter Waiter to register. - * @return Previous state. - */ - private Object registerWaiter(Object waiter) { - WaitNode waitNode = null; - - for (; ; ) { - final Object oldState = state; - - if (isDone(oldState)) - return oldState; - - Object newState; + assert resFlag != 0; - if (oldState == State.INIT) - newState = waiter; + if (resFlag == ERR) + throw U.cast((Throwable)res); - else { - if (waitNode == null) - waitNode = new WaitNode(waiter); - - waitNode.next = oldState; - - newState = waitNode; - } - - if (compareAndSetState(oldState, newState)) - return State.INIT; - } + return (R)res; } - /** - * @param waiter Waiter to unregister. - */ - void unregisterWaiter(Thread waiter) { - WaitNode prev = null; - Object cur = state; + /** {@inheritDoc} */ + @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) { + assert lsnr0 != null; - while (cur != null) { - Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter : cur; - Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null; + boolean done = isDone(); - if (curWaiter == waiter) { - if (prev == null) { - Object n = next == null ? State.INIT : next; + if (!done) { + synchronized (this) { + done = isDone(); // Double check. - cur = compareAndSetState(cur, n) ? null : state; - } - else { - prev.next = next; + if (!done) { + if (lsnr == null) + lsnr = lsnr0; + else if (lsnr instanceof ArrayListener) + ((ArrayListener)lsnr).add(lsnr0); + else + lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0); - cur = null; + return; } } - else { - prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null; - - cur = next; - } - } - } - - /** - * @param waiter Head of waiters queue to unblock. - */ - private void unblockAll(Object waiter) { - while (waiter != null) { - if (waiter instanceof Thread) { - LockSupport.unpark((Thread)waiter); - - return; - } - else if (waiter instanceof IgniteInClosure) { - notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter); - - return; - } - else if (waiter.getClass() == WaitNode.class) { - WaitNode waitNode = (WaitNode) waiter; - - unblockAll(waitNode.waiter); - - waiter = waitNode.next; - } - else - return; } - } - public void unblockAllThreads() { - unblockFirstThread0(state); - } + assert done; - /** - * @param waiter Head of waiters queue to unblock. - */ - private void unblockFirstThread0(Object waiter) { - while (waiter != null) { - if (waiter instanceof Thread) { - LockSupport.unpark((Thread)waiter); - - return; - } - else if (waiter.getClass() == WaitNode.class) { - WaitNode waitNode = (WaitNode) waiter; - - unblockFirstThread0(waitNode.waiter); - - waiter = waitNode.next; - } - else - return; - } - } - - /** {@inheritDoc} */ - @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) { - Object res = registerWaiter(newLsnr); - - if (res != State.INIT) - notifyListener(newLsnr); + notifyListener(lsnr0); } /** {@inheritDoc} */ @@ -406,10 +240,23 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { } /** - * @return Logger instance. + * Notifies all registered listeners. */ - @Nullable public IgniteLogger logger() { - return null; + private void notifyListeners() { + IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0; + + synchronized (this) { + lsnr0 = lsnr; + + if (lsnr0 == null) + return; + + lsnr = null; + } + + assert lsnr0 != null; + + notifyListener(lsnr0); } /** @@ -446,19 +293,9 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { /** {@inheritDoc} */ @Override public boolean isDone() { - return isDone(state); - } - - /** - * @param state State to check. - * @return {@code True} if future is done. - */ - private boolean isDone(Object state) { - return state == null || - !(state == State.INIT - || state.getClass() == WaitNode.class - || state instanceof Thread - || state instanceof IgniteInClosure); + // Don't check for "valid" here, as "done" flag can be read + // even in invalid state. + return endTime != 0; } /** @@ -466,12 +303,12 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { */ public boolean isFailed() { // Must read endTime first. - return state instanceof Throwable; + return endTime != 0 && resFlag == ERR; } /** {@inheritDoc} */ @Override public boolean isCancelled() { - return state == State.CANCELLED; + return getState() == CANCELLED; } /** @@ -525,31 +362,32 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { * @return {@code True} if result was set by this call. */ private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { - Object val = cancel ? State.CANCELLED : err != null ? err : res; - - for (; ; ) { - final Object oldState = state; + boolean notify = false; - if (isDone(oldState)) - return false; + try { + if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) { + if (err != null) { + resFlag = ERR; + this.res = err; + } + else { + resFlag = RES; + this.res = res; + } - if (compareAndSetState(oldState, val)) { - endTime = U.currentTimeMillis(); + notify = true; - unblockAll(oldState); + releaseShared(0); return true; } - } - } - /** - * @param oldState Old state to check. - * @param newState New state to set. - * @return {@code True} if state has been changed (CASed). - */ - private boolean compareAndSetState(Object oldState, Object newState) { - return state == oldState && stateUpd.compareAndSet(this, oldState, newState); + return false; + } + finally { + if (notify) + notifyListeners(); + } } /** @@ -561,18 +399,76 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { return onDone(null, null, true); } + /** {@inheritDoc} */ + @Override protected final int tryAcquireShared(int ignore) { + return endTime != 0 ? 1 : -1; + } + + /** {@inheritDoc} */ + @Override protected final boolean tryReleaseShared(int ignore) { + endTime = U.currentTimeMillis(); + + // Always signal after setting final done status. + return true; + } + /** * @return String representation of state. */ private String state() { - Object s = state; + int s = getState(); - return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT"; + return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE"; + } + + /** + * @return Logger instance. + */ + @Nullable public IgniteLogger logger() { + return null; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridFutureAdapter.class, this, "state", state(), "hash", System.identityHashCode(this)); + return S.toString(GridFutureAdapter.class, this, "state", state()); + } + + /** + * + */ + private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr; + + /** + * @param lsnrs Listeners. + */ + private ArrayListener(IgniteInClosure... lsnrs) { + this.arr = lsnrs; + } + + /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture<R> fut) { + for (int i = 0; i < arr.length; i++) + arr[i].apply(fut); + } + + /** + * @param lsnr Listener. + */ + void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) { + arr = Arrays.copyOf(arr, arr.length + 1); + + arr[arr.length - 1] = lsnr; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ArrayListener.class, this, "arrSize", arr.length); + } } /**
