Repository: ignite Updated Branches: refs/heads/ignite-single-op-tx a03c16508 -> 6b38cd05c
'Single' operations optimizations for tx cache. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b38cd05 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b38cd05 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b38cd05 Branch: refs/heads/ignite-single-op-tx Commit: 6b38cd05c6c073d8ca1d0493bd8029f3beda2209 Parents: a03c165 Author: sboikov <[email protected]> Authored: Mon Nov 16 14:38:10 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 16 15:07:21 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 46 ++++++++++---------- .../cache/GridCacheSharedContext.java | 11 ++++- .../IgniteTxImplicitSingleStateImpl.java | 5 +++ .../transactions/IgniteTxLocalAdapter.java | 23 ++++++++-- .../cache/transactions/IgniteTxLocalEx.java | 15 ++++++- .../IgniteTxRemoteStateAdapter.java | 5 +++ .../cache/transactions/IgniteTxState.java | 2 + .../cache/transactions/IgniteTxStateImpl.java | 17 ++++++-- 8 files changed, 90 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 49ca1dc..cbb7486 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1854,7 +1854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V V prevVal = syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, filter).get().value(); + return (V)tx.putAsync(ctx, key, val, true, filter).get().value(); } @Override public String toString() { @@ -1909,7 +1909,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, filter) + return tx.putAsync(ctx, key, val, true, filter) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2013,10 +2013,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); - - IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, invokeMap, args); + IgniteInternalFuture<GridCacheReturn> fut = + tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor, args); Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); @@ -2240,8 +2238,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, filter).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + return tx.putAsync(ctx, key, val, false, filter).chain( + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @Override public String toString() { @@ -2275,7 +2273,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray()).get().value(); + return (V)tx.putAsync(ctx, key, val, true, ctx.noValArray()).get().value(); } @Override public String toString() { @@ -2299,8 +2297,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray()) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); + return tx.putAsync(ctx, key, val, true, ctx.noValArray()) + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @Override public String toString() { @@ -2329,7 +2327,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).get().success(); + return tx.putAsync(ctx, key, val, false, ctx.noValArray()).get().success(); } @Override public String toString() { @@ -2358,7 +2356,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).chain( + return tx.putAsync(ctx, key, val, false, ctx.noValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2384,7 +2382,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).get().value(); + return (V)tx.putAsync(ctx, key, val, true, ctx.hasValArray()).get().value(); } @Override public String toString() { @@ -2408,7 +2406,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).chain( + return tx.putAsync(ctx, key, val, true, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2434,7 +2432,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).get().success(); + return tx.putAsync(ctx, key, val, false, ctx.hasValArray()).get().success(); } @Override public String toString() { @@ -2454,7 +2452,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).chain( + return tx.putAsync(ctx, key, val, false, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } @@ -2481,7 +2479,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).get() + return tx.putAsync(ctx, key, newVal, false, ctx.equalsValArray(oldVal)).get() .success(); } @@ -2518,7 +2516,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } - return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).chain( + return tx.putAsync(ctx, key, newVal, false, ctx.equalsValArray(oldVal)).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2883,8 +2881,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, - F.t(key, newVal), + return tx.putAsync(ctx, + key, + newVal, true, ctx.equalsValArray(oldVal)).get(); } @@ -2945,8 +2944,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>(e); } - return (IgniteInternalFuture)tx.putAllAsync(ctx, - F.t(key, newVal), + return (IgniteInternalFuture)tx.putAsync(ctx, + key, + newVal, true, ctx.equalsValArray(oldVal)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 5321bb3..4293b90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -591,10 +591,17 @@ public class GridCacheSharedContext<K, V> { * @param tx Transaction to commit. * @return Commit future. */ + @SuppressWarnings("unchecked") public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) { - tx.txState().awaitLastFut(this); + GridCacheContext ctx = tx.txState().singleCacheContext(this); - return tx.commitAsync(); + if (ctx == null) { + tx.txState().awaitLastFut(this); + + return tx.commitAsync(); + } + else + return ctx.cache().commitTxAsync(tx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index 1b99159..c4012e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -58,6 +58,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ + @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { + return cacheCtx; + } + + /** {@inheritDoc} */ @Nullable @Override public Integer firstCacheId() { return cacheCtx != null ? cacheCtx.cacheId() : null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d22d6f4..895bcd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1922,7 +1922,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter V val, boolean retval, CacheEntryPredicate[] filter) { - return putAsync0(cacheCtx, key, val, retval, filter); + return putAsync0(cacheCtx, key, val, null, null, retval, filter); + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx, + K key, + EntryProcessor<K, V, Object> entryProcessor, + Object... invokeArgs) { + return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null); } /** {@inheritDoc} */ @@ -2015,6 +2023,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final boolean needVal = singleRmv || retval || hasFilters; final boolean needReadVer = needVal && (serializable() && optimistic()); + if (entryProcessor != null) + transform = true; + boolean loadMissed = enlistWriteEntry(cacheCtx, cacheKey, val, @@ -2847,6 +2858,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param cacheCtx Cache context. * @param key Key. * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Return value flag. * @param filter Filter. * @return Operation future. @@ -2854,7 +2867,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter private <K, V> IgniteInternalFuture putAsync0( final GridCacheContext cacheCtx, K key, - V val, + @Nullable V val, + @Nullable EntryProcessor entryProcessor, + @Nullable final Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate[] filter ) { @@ -2874,8 +2889,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cacheKey, val, opCtx != null ? opCtx.expiry() : null, - null, - null, + entryProcessor, + invokeArgs, retval, /*lockOnly*/false, filter, http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index f9555cc..5dc3338 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -108,9 +108,22 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { /** * @param cacheCtx Cache context. + * @param key Key. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for entry processor. + * @return Operation future. + */ + public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync( + GridCacheContext cacheCtx, + K key, + EntryProcessor<K, V, Object> entryProcessor, + Object... invokeArgs); + + /** + * @param cacheCtx Cache context. * @param map Entry processors map. * @param invokeArgs Optional arguments for entry processor. - * @return Transform operation future. + * @return Operation future. */ public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 9be37e1..5b15296 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -99,6 +99,11 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState } /** {@inheritDoc} */ + @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { + return null; + } + + /** {@inheritDoc} */ @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { assert false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java index cb9d93d..4965c29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -44,6 +44,8 @@ public interface IgniteTxState { */ @Nullable public Integer firstCacheId(); + @Nullable GridCacheContext singleCacheContext(GridCacheSharedContext cctx); + /** * * @param cctx http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 1e12fe4..d80cef9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -69,11 +69,20 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public void awaitLastFut(GridCacheSharedContext cctx) { - if (activeCacheIds.size() > 1) { - for (Integer cacheId : activeCacheIds) - cctx.cacheContext(cacheId).cache().awaitLastFut(); + @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { + if (activeCacheIds.size() == 1) { + int cacheId = F.first(activeCacheIds); + + return cctx.cacheContext(cacheId); } + + return null; + } + + /** {@inheritDoc} */ + @Override public void awaitLastFut(GridCacheSharedContext cctx) { + for (Integer cacheId : activeCacheIds) + cctx.cacheContext(cacheId).cache().awaitLastFut(); } /** {@inheritDoc} */
