Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-tx a03c16508 -> 6b38cd05c


'Single' operations optimizations for tx cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b38cd05
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b38cd05
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b38cd05

Branch: refs/heads/ignite-single-op-tx
Commit: 6b38cd05c6c073d8ca1d0493bd8029f3beda2209
Parents: a03c165
Author: sboikov <[email protected]>
Authored: Mon Nov 16 14:38:10 2015 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 16 15:07:21 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 46 ++++++++++----------
 .../cache/GridCacheSharedContext.java           | 11 ++++-
 .../IgniteTxImplicitSingleStateImpl.java        |  5 +++
 .../transactions/IgniteTxLocalAdapter.java      | 23 ++++++++--
 .../cache/transactions/IgniteTxLocalEx.java     | 15 ++++++-
 .../IgniteTxRemoteStateAdapter.java             |  5 +++
 .../cache/transactions/IgniteTxState.java       |  2 +
 .../cache/transactions/IgniteTxStateImpl.java   | 17 ++++++--
 8 files changed, 90 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 49ca1dc..cbb7486 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1854,7 +1854,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         V prevVal = syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws 
IgniteCheckedException {
-                return (V)tx.putAllAsync(ctx, F.t(key, val), true, 
filter).get().value();
+                return (V)tx.putAsync(ctx, key, val, true, 
filter).get().value();
             }
 
             @Override public String toString() {
@@ -1909,7 +1909,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         return asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter 
tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), true, filter)
+                return tx.putAsync(ctx, key, val, true, filter)
                     
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
@@ -2013,10 +2013,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
             @Nullable @Override public EntryProcessorResult<T> 
op(IgniteTxLocalAdapter tx)
                 throws IgniteCheckedException {
-                Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
-                    Collections.singletonMap(key, (EntryProcessor<K, V, 
Object>) entryProcessor);
-
-                IgniteInternalFuture<GridCacheReturn> fut = 
tx.invokeAsync(ctx, invokeMap, args);
+                IgniteInternalFuture<GridCacheReturn> fut =
+                    tx.invokeAsync(ctx, key, (EntryProcessor<K, V, 
Object>)entryProcessor, args);
 
                 Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
 
@@ -2240,8 +2238,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> 
op(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), false, filter).chain(
-                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>) RET2FLAG);
+                return tx.putAsync(ctx, key, val, false, filter).chain(
+                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>)RET2FLAG);
             }
 
             @Override public String toString() {
@@ -2275,7 +2273,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         return syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws 
IgniteCheckedException {
-                return (V)tx.putAllAsync(ctx, F.t(key, val), true, 
ctx.noValArray()).get().value();
+                return (V)tx.putAsync(ctx, key, val, true, 
ctx.noValArray()).get().value();
             }
 
             @Override public String toString() {
@@ -2299,8 +2297,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter 
tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), true, 
ctx.noValArray())
-                    
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+                return tx.putAsync(ctx, key, val, true, ctx.noValArray())
+                    
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
             @Override public String toString() {
@@ -2329,7 +2327,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         Boolean stored = syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws 
IgniteCheckedException {
-                return tx.putAllAsync(ctx, F.t(key, val), false, 
ctx.noValArray()).get().success();
+                return tx.putAsync(ctx, key, val, false, 
ctx.noValArray()).get().success();
             }
 
             @Override public String toString() {
@@ -2358,7 +2356,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> 
op(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), false, 
ctx.noValArray()).chain(
+                return tx.putAsync(ctx, key, val, false, 
ctx.noValArray()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>)RET2FLAG);
             }
 
@@ -2384,7 +2382,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         return syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws 
IgniteCheckedException {
-                return (V)tx.putAllAsync(ctx, F.t(key, val), true, 
ctx.hasValArray()).get().value();
+                return (V)tx.putAsync(ctx, key, val, true, 
ctx.hasValArray()).get().value();
             }
 
             @Override public String toString() {
@@ -2408,7 +2406,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter 
tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), true, 
ctx.hasValArray()).chain(
+                return tx.putAsync(ctx, key, val, true, 
ctx.hasValArray()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
V>)RET2VAL);
             }
 
@@ -2434,7 +2432,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         return syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws 
IgniteCheckedException {
-                return tx.putAllAsync(ctx, F.t(key, val), false, 
ctx.hasValArray()).get().success();
+                return tx.putAsync(ctx, key, val, false, 
ctx.hasValArray()).get().success();
             }
 
             @Override public String toString() {
@@ -2454,7 +2452,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> 
op(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), false, 
ctx.hasValArray()).chain(
+                return tx.putAsync(ctx, key, val, false, 
ctx.hasValArray()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>) RET2FLAG);
             }
 
@@ -2481,7 +2479,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return tx.putAllAsync(ctx, F.t(key, newVal), false, 
ctx.equalsValArray(oldVal)).get()
+                return tx.putAsync(ctx, key, newVal, false, 
ctx.equalsValArray(oldVal)).get()
                     .success();
             }
 
@@ -2518,7 +2516,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                     }
                 }
 
-                return tx.putAllAsync(ctx, F.t(key, newVal), false, 
ctx.equalsValArray(oldVal)).chain(
+                return tx.putAsync(ctx, key, newVal, false, 
ctx.equalsValArray(oldVal)).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>)RET2FLAG);
             }
 
@@ -2883,8 +2881,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return tx.putAllAsync(ctx,
-                        F.t(key, newVal),
+                return tx.putAsync(ctx,
+                        key,
+                        newVal,
                         true,
                         ctx.equalsValArray(oldVal)).get();
             }
@@ -2945,8 +2944,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                     return new GridFinishedFuture<>(e);
                 }
 
-                return (IgniteInternalFuture)tx.putAllAsync(ctx,
-                    F.t(key, newVal),
+                return (IgniteInternalFuture)tx.putAsync(ctx,
+                    key,
+                    newVal,
                     true,
                     ctx.equalsValArray(oldVal));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5321bb3..4293b90 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -591,10 +591,17 @@ public class GridCacheSharedContext<K, V> {
      * @param tx Transaction to commit.
      * @return Commit future.
      */
+    @SuppressWarnings("unchecked")
     public IgniteInternalFuture<IgniteInternalTx> 
commitTxAsync(IgniteInternalTx tx) {
-        tx.txState().awaitLastFut(this);
+        GridCacheContext ctx = tx.txState().singleCacheContext(this);
 
-        return tx.commitAsync();
+        if (ctx == null) {
+            tx.txState().awaitLastFut(this);
+
+            return tx.commitAsync();
+        }
+        else
+            return ctx.cache().commitTxAsync(tx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 1b99159..c4012e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -58,6 +58,11 @@ public class IgniteTxImplicitSingleStateImpl extends 
IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public GridCacheContext 
singleCacheContext(GridCacheSharedContext cctx) {
+        return cacheCtx;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Integer firstCacheId() {
         return cacheCtx != null ? cacheCtx.cacheId() : null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d22d6f4..895bcd7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1922,7 +1922,15 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         V val,
         boolean retval,
         CacheEntryPredicate[] filter) {
-        return putAsync0(cacheCtx, key, val, retval, filter);
+        return putAsync0(cacheCtx, key, val, null, null, retval, filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteInternalFuture<GridCacheReturn> 
invokeAsync(GridCacheContext cacheCtx,
+        K key,
+        EntryProcessor<K, V, Object> entryProcessor,
+        Object... invokeArgs) {
+        return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, 
entryProcessor, invokeArgs, true, null);
     }
 
     /** {@inheritDoc} */
@@ -2015,6 +2023,9 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
             final boolean needVal = singleRmv || retval || hasFilters;
             final boolean needReadVer = needVal && (serializable() && 
optimistic());
 
+            if (entryProcessor != null)
+                transform = true;
+
             boolean loadMissed = enlistWriteEntry(cacheCtx,
                 cacheKey,
                 val,
@@ -2847,6 +2858,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
      * @param cacheCtx Cache context.
      * @param key Key.
      * @param val Value.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param retval Return value flag.
      * @param filter Filter.
      * @return Operation future.
@@ -2854,7 +2867,9 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
     private <K, V> IgniteInternalFuture putAsync0(
         final GridCacheContext cacheCtx,
         K key,
-        V val,
+        @Nullable V val,
+        @Nullable EntryProcessor entryProcessor,
+        @Nullable final Object[] invokeArgs,
         final boolean retval,
         @Nullable final CacheEntryPredicate[] filter
     ) {
@@ -2874,8 +2889,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 cacheKey,
                 val,
                 opCtx != null ? opCtx.expiry() : null,
-                null,
-                null,
+                entryProcessor,
+                invokeArgs,
                 retval,
                 /*lockOnly*/false,
                 filter,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index f9555cc..5dc3338 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -108,9 +108,22 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
 
     /**
      * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @return Operation future.
+     */
+    public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+        GridCacheContext cacheCtx,
+        K key,
+        EntryProcessor<K, V, Object> entryProcessor,
+        Object... invokeArgs);
+
+    /**
+     * @param cacheCtx Cache context.
      * @param map Entry processors map.
      * @param invokeArgs Optional arguments for entry processor.
-     * @return Transform operation future.
+     * @return Operation future.
      */
     public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 9be37e1..5b15296 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -99,6 +99,11 @@ public abstract class IgniteTxRemoteStateAdapter implements 
IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public GridCacheContext 
singleCacheContext(GridCacheSharedContext cctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void onTxEnd(GridCacheSharedContext cctx, 
IgniteInternalTx tx, boolean commit) {
         assert false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index cb9d93d..4965c29 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -44,6 +44,8 @@ public interface IgniteTxState {
      */
     @Nullable public Integer firstCacheId();
 
+    @Nullable GridCacheContext singleCacheContext(GridCacheSharedContext cctx);
+
     /**
      *
      * @param cctx

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 1e12fe4..d80cef9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -69,11 +69,20 @@ public class IgniteTxStateImpl extends 
IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitLastFut(GridCacheSharedContext cctx) {
-        if (activeCacheIds.size() > 1) {
-            for (Integer cacheId : activeCacheIds)
-                cctx.cacheContext(cacheId).cache().awaitLastFut();
+    @Nullable @Override public GridCacheContext 
singleCacheContext(GridCacheSharedContext cctx) {
+        if (activeCacheIds.size() == 1) {
+            int cacheId = F.first(activeCacheIds);
+
+            return cctx.cacheContext(cacheId);
         }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitLastFut(GridCacheSharedContext cctx) {
+        for (Integer cacheId : activeCacheIds)
+            cctx.cacheContext(cacheId).cache().awaitLastFut();
     }
 
     /** {@inheritDoc} */

Reply via email to